Async Performance Patterns
The One-Line Change That Costs 200ms Per Request
These two FastAPI endpoint implementations look nearly identical:
# Version A: shared client (correct)
import httpx
from contextlib import asynccontextmanager
from fastapi import FastAPI
_http_client: httpx.AsyncClient = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global _http_client
_http_client = httpx.AsyncClient(
timeout=httpx.Timeout(10.0),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
)
yield
await _http_client.aclose()
app = FastAPI(lifespan=lifespan)
@app.get("/api/price/{ticker}")
async def get_price_v1(ticker: str):
response = await _http_client.get(f"https://api.prices.internal/{ticker}")
return response.json()
# Version B: new client per request (wrong)
@app.get("/api/price-v2/{ticker}")
async def get_price_v2(ticker: str):
async with httpx.AsyncClient() as client: # NEW CLIENT EVERY REQUEST
response = await client.get(f"https://api.prices.internal/{ticker}")
return response.json()
Under load at 500 req/s:
| Metric | Version A (shared client) | Version B (new client/req) |
|---|---|---|
| Median latency | 8ms | 218ms |
| p99 latency | 23ms | 891ms |
| Throughput | 500 req/s | 112 req/s |
| Error rate | 0% | 12% (connection timeouts) |
The difference: TCP connection establishment. Every HTTP request in Version B negotiates a new TCP connection (and TLS handshake for HTTPS): approximately 2 round trips = 200ms on a cross-datacenter link. Version A reuses existing connections from a pool. 200ms × 500 req/s = the service cannot keep up.
This is the most common async performance mistake in production Python services. It is followed closely by a half-dozen others - all covered in this lesson.
What You Will Learn
- Understand asyncio's event loop internals well enough to diagnose stalls
- Drop in
uvloopfor a 2–4x event loop speedup - Configure and manage connection pools correctly for HTTP clients and databases
- Use
asyncio.Semaphoreto cap concurrent outbound requests - Compare
asyncio.gather,asyncio.TaskGroup, and sequential for correctness and performance - Implement backpressure with
asyncio.Queueto prevent OOM under load spikes - Run CPU-bound and blocking I/O code correctly from async context
- Profile and debug stalled asyncio tasks in production
Prerequisites
| Requirement | Level Needed |
|---|---|
| Python async/await syntax | Comfortable |
asyncio.gather basics | Familiar |
| HTTP and TCP fundamentals | Helpful |
| FastAPI or similar async framework | Helpful |
Section 1: asyncio Event Loop Internals
Understanding what the event loop actually does prevents a large class of async performance bugs.
The Event Loop Cycle
asyncio runs a single-threaded event loop. The core loop (simplified from CPython source) is:
_run_once():
1. Calculate the timeout for the next scheduled callback
2. Call epoll_wait() / kqueue() / IOCP with the timeout
→ This blocks the thread (but wakes on I/O or timeout)
3. Mark all I/O-ready file descriptors as ready
4. Execute all ready callbacks (zero timeout - don't block)
5. Execute any scheduled callbacks whose deadline has passed
6. Repeat forever
Visualised:
Event Loop Thread
│
├── epoll_wait(fd_set, timeout=0.003s)
│ ↑ blocks here until I/O ready or timeout
│
├── socket.read() ready → schedule http_response_callback
├── socket.write() ready → write next chunk
│
├── Execute callbacks:
│ http_response_callback() → resumes coroutine A
│ timer_callback() → resumes coroutine B
│
└── ← repeat ────────────────────────────────────────────
Critical implication: the event loop is single-threaded. A blocking call anywhere in a coroutine blocks ALL coroutines. There is no preemption - a coroutine runs until it explicitly yields control via await.
The Blocking Call Trap
import asyncio
import time
async def fetch_price(ticker: str) -> float:
await asyncio.sleep(0.001) # simulates network I/O
return 42.0
async def handle_request(request_id: int) -> None:
prices = await fetch_price("AAPL")
# BUG: time.sleep() blocks the ENTIRE event loop
# All other coroutines freeze for 0.5 seconds
time.sleep(0.5) # ← never do this in a coroutine
return {"id": request_id, "price": prices}
async def bad_service():
# 100 concurrent requests - but they all freeze when any hits time.sleep()
tasks = [handle_request(i) for i in range(100)]
await asyncio.gather(*tasks)
Other blocking calls that freeze the event loop:
requests.get()(synchronous HTTP)open()followed byf.read()(synchronous file I/O)time.sleep()(synchronous sleep)- CPU-heavy computation (no
await- never yields) subprocess.run()(synchronous subprocess)- Any
socket.*call withoutawait
Section 2: uvloop - Drop-In Event Loop Speedup
uvloop is a fast implementation of the asyncio event loop built on top of libuv (the same C library powering Node.js). It is a drop-in replacement requiring one line of code.
pip install uvloop
import asyncio
import uvloop
# Option 1: replace globally before any asyncio usage
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Option 2: use as a runner (Python 3.11+)
async def main():
...
uvloop.run(main())
# Option 3: for a specific loop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()
With FastAPI / Uvicorn
# uvicorn uses uvloop automatically when available
pip install uvicorn[standard] # installs uvloop as a dependency
uvicorn myapp:app --loop uvloop
Benchmark: asyncio vs uvloop
import asyncio
import uvloop
import time
async def echo_many():
"""Simulate many small I/O operations."""
for _ in range(100_000):
await asyncio.sleep(0) # yield control 100k times
# Standard asyncio
loop = asyncio.new_event_loop()
start = time.perf_counter()
loop.run_until_complete(echo_many())
asyncio_time = time.perf_counter() - start
loop.close()
print(f"asyncio: {asyncio_time:.3f}s")
# uvloop
loop = uvloop.new_event_loop()
start = time.perf_counter()
loop.run_until_complete(echo_many())
uvloop_time = time.perf_counter() - start
loop.close()
print(f"uvloop: {uvloop_time:.3f}s")
print(f"Speedup: {asyncio_time / uvloop_time:.1f}x")
asyncio: 1.847s
uvloop: 0.612s
Speedup: 3.0x
uvloop is typically 2–4x faster for I/O-heavy workloads. The speedup comes from libuv's efficient I/O multiplexing and tighter callback dispatch. For services with thousands of concurrent connections, this translates directly to higher throughput.
Section 3: Connection Pooling
Connection pools maintain a set of pre-established connections to a server. Instead of creating a new TCP connection per request (expensive), a connection from the pool is borrowed, used, and returned. This is the single highest-leverage async optimisation for most production services.
HTTP Client: httpx.AsyncClient
import httpx
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from typing import Optional
# Module-level shared client - one instance for the entire application
_http_client: Optional[httpx.AsyncClient] = None
def get_http_client() -> httpx.AsyncClient:
"""FastAPI dependency that returns the shared HTTP client."""
if _http_client is None:
raise RuntimeError("HTTP client not initialised - check lifespan setup")
return _http_client
@asynccontextmanager
async def lifespan(app: FastAPI):
global _http_client
_http_client = httpx.AsyncClient(
timeout=httpx.Timeout(
connect=2.0, # TCP connection timeout
read=10.0, # Response read timeout
write=5.0, # Request write timeout
pool=2.0, # Pool acquisition timeout
),
limits=httpx.Limits(
max_connections=200, # total concurrent connections
max_keepalive_connections=50, # persistent connections to keep alive
keepalive_expiry=30.0, # how long to keep idle connections
),
headers={
"User-Agent": "MyService/1.0",
"Accept-Encoding": "gzip, deflate",
},
)
yield
await _http_client.aclose()
app = FastAPI(lifespan=lifespan)
Database Connection Pool: SQLAlchemy Async
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from fastapi import Depends
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"
engine = create_async_engine(
DATABASE_URL,
pool_size=20, # maintain 20 persistent connections
max_overflow=10, # allow up to 10 extra connections under load
pool_timeout=30, # wait up to 30s for a connection from the pool
pool_recycle=3600, # recycle connections after 1 hour (avoid stale)
pool_pre_ping=True, # test connection health before use
echo=False,
)
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_db() -> AsyncSession:
"""FastAPI dependency for a database session."""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
The Pool Exhaustion Trap
Pool exhaustion: all connections in the pool are in use and a new request must wait. If the wait exceeds pool_timeout, an exception is raised.
import asyncio
import httpx
async def demonstrate_pool_exhaustion():
"""Shows what happens when pool_size is too small."""
client = httpx.AsyncClient(
limits=httpx.Limits(
max_connections=5, # pool of 5
max_keepalive_connections=5,
),
timeout=httpx.Timeout(pool=1.0), # 1s to get a connection
)
async def fetch(i):
# Each request takes 2 seconds - with 5-connection pool and
# 10 concurrent requests, 5 will wait and hit the timeout
await asyncio.sleep(2.0) # simulate slow response
return i
tasks = [fetch(i) for i in range(10)]
try:
results = await asyncio.gather(*tasks)
except httpx.PoolTimeout as e:
print(f"Pool exhausted: {e}")
await client.aclose()
Diagnosing pool exhaustion: pool exhaustion manifests as PoolTimeout exceptions that spike during high load. To detect it before it causes errors, expose pool metrics:
# httpx pool statistics (approximate)
def log_pool_stats(client: httpx.AsyncClient) -> None:
pool = client._transport._pool
print(f"Connections: {len(pool._connections)}/{pool._max_connections}")
print(f"Idle: {sum(1 for c in pool._connections if c.is_idle)}")
# SQLAlchemy pool statistics
def log_db_pool_stats() -> None:
pool = engine.pool
print(f"Pool size: {pool.size()}")
print(f"Checked out: {pool.checkedout()}")
print(f"Overflow: {pool.overflow()}")
print(f"Invalid: {pool.invalidated()}")
Section 4: Semaphore for Rate Limiting
Even with a large connection pool, you may need to cap concurrent outbound requests to prevent overwhelming a downstream service (or to respect rate limits).
import asyncio
import httpx
from typing import Any
async def fetch_with_semaphore(
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
url: str,
) -> dict:
"""Fetch a URL while holding a semaphore slot."""
async with semaphore:
response = await client.get(url)
response.raise_for_status()
return response.json()
async def batch_fetch(
urls: list[str],
max_concurrent: int = 20,
) -> list[dict]:
"""
Fetch many URLs concurrently, but cap concurrent connections at max_concurrent.
Without the semaphore, all 1000 requests would launch simultaneously,
potentially overwhelming the target service.
"""
semaphore = asyncio.Semaphore(max_concurrent)
async with httpx.AsyncClient() as client:
tasks = [
fetch_with_semaphore(client, semaphore, url)
for url in urls
]
return await asyncio.gather(*tasks, return_exceptions=True)
# Usage: fetch 1000 URLs with at most 20 concurrent connections
urls = [f"https://api.example.com/item/{i}" for i in range(1000)]
results = asyncio.run(batch_fetch(urls, max_concurrent=20))
Adaptive Rate Limiting
For APIs with per-second rate limits (e.g., "100 requests per second"):
import asyncio
import time
from collections import deque
class RateLimiter:
"""
Token bucket rate limiter for async code.
Allows up to `rate` requests per `period` seconds.
"""
def __init__(self, rate: int, period: float = 1.0):
self.rate = rate
self.period = period
self._semaphore = asyncio.Semaphore(rate)
self._request_times: deque = deque()
self._lock = asyncio.Lock()
async def acquire(self) -> None:
"""Wait until a request slot is available."""
async with self._lock:
now = time.monotonic()
# Remove request timestamps older than the period
while self._request_times and self._request_times[0] < now - self.period:
self._request_times.popleft()
if len(self._request_times) >= self.rate:
# Wait until the oldest request falls outside the window
sleep_for = self._request_times[0] + self.period - now
await asyncio.sleep(sleep_for)
self._request_times.append(time.monotonic())
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, *args):
pass
# Usage
limiter = RateLimiter(rate=100, period=1.0) # 100 req/s
async def rate_limited_fetch(client, url):
async with limiter:
return await client.get(url)
Section 5: asyncio.gather vs asyncio.TaskGroup vs Sequential
Three patterns for running multiple coroutines. Each has different semantics for error handling.
Sequential (Baseline)
import asyncio
import time
async def fetch_weather(city: str) -> dict:
await asyncio.sleep(1.0) # simulates 1-second I/O
return {"city": city, "temp": 22.0}
async def get_dashboard_sequential(cities: list[str]) -> list[dict]:
"""Sequential: 10 cities × 1s = 10 seconds total."""
results = []
for city in cities:
result = await fetch_weather(city)
results.append(result)
return results
cities = ["London", "Paris", "Tokyo", "NYC", "Sydney",
"Mumbai", "Berlin", "Cairo", "Lagos", "Seoul"]
start = time.perf_counter()
results = asyncio.run(get_dashboard_sequential(cities))
print(f"Sequential: {time.perf_counter() - start:.2f}s") # 10.02s
asyncio.gather - Concurrent, One Error Cancels All
async def get_dashboard_gather(cities: list[str]) -> list[dict]:
"""Concurrent: 10 cities, all run simultaneously = ~1 second total."""
tasks = [fetch_weather(city) for city in cities]
return await asyncio.gather(*tasks)
start = time.perf_counter()
results = asyncio.run(get_dashboard_gather(cities))
print(f"gather: {time.perf_counter() - start:.2f}s") # 1.01s
Error handling with asyncio.gather:
# return_exceptions=False (default): first exception cancels nothing,
# but the exception propagates and gather raises it
# return_exceptions=True: exceptions are returned as values, not raised
async def get_dashboard_resilient(cities: list[str]) -> list[dict | Exception]:
tasks = [fetch_weather(city) for city in cities]
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = []
for city, result in zip(cities, results):
if isinstance(result, Exception):
print(f"Failed to fetch {city}: {result}")
else:
successes.append(result)
return successes
asyncio.TaskGroup - Structured Concurrency (Python 3.11+)
import asyncio
async def get_dashboard_taskgroup(cities: list[str]) -> list[dict]:
"""
TaskGroup: if ANY task raises, ALL sibling tasks are cancelled.
This is structured concurrency - tasks cannot outlive their scope.
"""
results = []
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_weather(city)) for city in cities]
# All tasks completed (or one raised → all cancelled)
return [task.result() for task in tasks]
The difference from gather:
| Behaviour | gather | TaskGroup |
|---|---|---|
| Exception from one task | Returns/raises, others continue | Cancels all siblings |
| Tasks can outlive the scope | Yes (if not awaited) | No - they are cancelled on exit |
| Timeout per task | Manual with asyncio.wait_for | Manual with asyncio.wait_for |
| Python version | All | Python 3.11+ |
| Preferred for structured code | Less structured | Yes - recommended modern approach |
Timing Comparison
import asyncio
import time
async def slow_io(duration: float, name: str) -> str:
await asyncio.sleep(duration)
return f"Done: {name}"
async def benchmark():
tasks_args = [(1.0, f"task-{i}") for i in range(10)]
# Sequential
start = time.perf_counter()
for duration, name in tasks_args:
await slow_io(duration, name)
print(f"Sequential: {time.perf_counter() - start:.2f}s")
# gather
start = time.perf_counter()
await asyncio.gather(*[slow_io(d, n) for d, n in tasks_args])
print(f"gather: {time.perf_counter() - start:.2f}s")
# TaskGroup
start = time.perf_counter()
async with asyncio.TaskGroup() as tg:
[tg.create_task(slow_io(d, n)) for d, n in tasks_args]
print(f"TaskGroup: {time.perf_counter() - start:.2f}s")
asyncio.run(benchmark())
Sequential: 10.01s
gather: 1.01s
TaskGroup: 1.01s
Both concurrent approaches have identical performance characteristics. TaskGroup is preferred for correctness (structured concurrency) in new code.
Section 6: Backpressure with asyncio.Queue
Without backpressure, a fast producer and slow consumer leads to unbounded queue growth - eventually OOM. asyncio.Queue with maxsize implements backpressure: the producer blocks (awaits) when the queue is full.
The Unbounded Queue Problem
import asyncio
# BAD: unbounded queue - producer runs at full speed
async def bad_pipeline():
queue = asyncio.Queue() # no maxsize - unlimited growth
async def producer():
for i in range(1_000_000):
await queue.put(i) # never blocks
# Problem: if consumer is slow, queue fills memory
async def consumer():
while True:
item = await queue.get()
await asyncio.sleep(0.001) # slow consumer
queue.task_done()
await asyncio.gather(producer(), consumer())
Bounded Queue with Backpressure
import asyncio
import time
from typing import AsyncIterator
async def producer(
queue: asyncio.Queue,
items: list,
) -> None:
"""
Produce items into a bounded queue.
If the queue is full (maxsize reached), await queue.put() blocks
until the consumer has processed something - automatic backpressure.
"""
for item in items:
await queue.put(item) # blocks if queue is full
await queue.put(None) # sentinel to signal completion
async def consumer(
queue: asyncio.Queue,
n_workers: int = 4,
) -> list:
"""
Consume items from the queue with N parallel workers.
Each worker processes one item at a time.
"""
results = []
async def worker():
while True:
item = await queue.get()
if item is None:
queue.task_done()
await queue.put(None) # re-put sentinel for other workers
break
try:
result = await process_item(item)
results.append(result)
finally:
queue.task_done()
workers = [asyncio.create_task(worker()) for _ in range(n_workers)]
await asyncio.gather(*workers)
return results
async def bounded_pipeline(items: list, max_queue_size: int = 100) -> list:
"""
Bounded producer-consumer pipeline.
max_queue_size controls peak memory usage.
Producer will stall if consumer falls behind.
"""
queue = asyncio.Queue(maxsize=max_queue_size)
producer_task = asyncio.create_task(producer(queue, items))
consumer_result = await consumer(queue, n_workers=8)
await producer_task
return consumer_result
Queue as a Retry Buffer
import asyncio
from dataclasses import dataclass
from typing import Any
@dataclass
class WorkItem:
payload: Any
attempts: int = 0
max_attempts: int = 3
async def reliable_processor(items: list[Any]) -> None:
"""
Process items with automatic retry on failure.
Failed items are re-queued with a delay (exponential backoff).
"""
queue: asyncio.Queue[WorkItem | None] = asyncio.Queue(maxsize=500)
async def producer():
for item in items:
await queue.put(WorkItem(payload=item))
await queue.put(None) # sentinel
async def worker():
while True:
work_item = await queue.get()
if work_item is None:
queue.task_done()
break
try:
await process_item(work_item.payload)
queue.task_done()
except Exception as e:
work_item.attempts += 1
if work_item.attempts < work_item.max_attempts:
# Re-queue with backoff
delay = 2 ** work_item.attempts
await asyncio.sleep(delay)
await queue.put(work_item)
else:
print(f"Giving up on {work_item.payload} after {work_item.attempts} attempts")
queue.task_done()
await asyncio.gather(producer(), worker(), worker(), worker(), worker())
Section 7: CPU-Bound Work in Async
Async does not help with CPU-bound work - running a CPU-intensive function in a coroutine blocks the event loop for its entire duration. The right tools are ThreadPoolExecutor (for I/O-bound blocking code) and ProcessPoolExecutor (for CPU-bound code).
loop.run_in_executor - Offloading Blocking Work
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import functools
# Shared executors - create once, reuse
_thread_pool = ThreadPoolExecutor(max_workers=20)
_process_pool = ProcessPoolExecutor(max_workers=8)
async def run_in_thread(func, *args, **kwargs):
"""
Run a blocking I/O function in a thread pool.
The event loop continues processing other coroutines while this runs.
Use for: legacy sync code, blocking database drivers, file I/O,
third-party clients without async support.
"""
loop = asyncio.get_running_loop()
# functools.partial is needed to pass keyword arguments
if kwargs:
func = functools.partial(func, **kwargs)
return await loop.run_in_executor(_thread_pool, func, *args)
async def run_in_process(func, *args, **kwargs):
"""
Run a CPU-bound function in a process pool.
Bypasses the GIL - true parallelism for CPU-intensive work.
Use for: image processing, ML inference, large data transformation,
cryptographic operations.
"""
loop = asyncio.get_running_loop()
if kwargs:
func = functools.partial(func, **kwargs)
return await loop.run_in_executor(_process_pool, func, *args)
# Examples
import time
import requests # synchronous HTTP library (for illustration)
def blocking_http_call(url: str) -> dict:
"""Synchronous HTTP call - would block the event loop if called directly."""
response = requests.get(url, timeout=10)
return response.json()
def cpu_intensive_computation(data: list[float]) -> float:
"""CPU-bound work - needs ProcessPoolExecutor."""
return sum(x ** 2.0 for x in data) ** 0.5
@app.get("/data")
async def get_data():
# Blocking HTTP - run in thread pool
external_data = await run_in_thread(blocking_http_call, "http://slow-api.internal/data")
# CPU-heavy - run in process pool
score = await run_in_process(cpu_intensive_computation, external_data['values'])
return {"score": score}
asyncio.to_thread() - Python 3.9+
Simpler API for thread pool offloading:
import asyncio
def slow_sync_operation(data: bytes) -> dict:
"""Any blocking synchronous function."""
import json
return json.loads(data.decode())
async def handler():
raw = b'{"key": "value", "count": 42}'
# asyncio.to_thread handles the executor boilerplate
result = await asyncio.to_thread(slow_sync_operation, raw)
return result
Choosing the Right Executor
| Work Type | Executor | Why |
|---|---|---|
| Blocking I/O (sync HTTP, files) | ThreadPoolExecutor | Releases GIL during I/O - threads work |
| Legacy sync DB driver | ThreadPoolExecutor | Same - I/O bound |
| CPU-heavy Python computation | ProcessPoolExecutor | Bypasses GIL via separate processes |
| NumPy / scipy operations | ThreadPoolExecutor | NumPy releases GIL internally |
| Image/video processing (OpenCV) | ProcessPoolExecutor | CPU-bound, benefits from process parallelism |
| ML model inference (CPU) | ProcessPoolExecutor | CPU-bound |
| ML model inference (GPU) | ThreadPoolExecutor | GPU ops release GIL; process overhead wasteful |
Section 8: Profiling Async Code
Standard profiling tools (cProfile, py-spy) work with async code but produce confusing output because coroutine frames are stored in heap objects, not on the C call stack. Dedicated async profiling tools exist.
asyncio.all_tasks() - Inspecting Running Tasks
import asyncio
async def diagnose_stuck_tasks() -> None:
"""
Find tasks that have been running for too long without awaiting.
A healthy async service has short coroutine runs between awaits.
"""
tasks = asyncio.all_tasks()
for task in tasks:
if task.done():
continue
# Get the coroutine's current call stack
stack = task.get_stack()
if not stack:
continue
frame = stack[-1] # innermost frame
print(
f"Task: {task.get_name()}\n"
f" at: {frame.f_code.co_filename}:{frame.f_lineno}\n"
f" in: {frame.f_code.co_name}\n"
)
# Schedule periodic task diagnosis
async def watchdog(interval: float = 10.0) -> None:
while True:
await asyncio.sleep(interval)
await diagnose_stuck_tasks()
aiomonitor - Live Async Monitor
pip install aiomonitor
import asyncio
import aiomonitor
async def main():
# ... your application code ...
pass
if __name__ == "__main__":
with aiomonitor.start_monitor(loop=asyncio.get_event_loop()):
asyncio.run(main())
# Connect to the monitor:
# nc localhost 50101
# Available commands: ps, cancel, where <id>, stacktrace
aiomonitor attaches a telnet server to the running event loop. You can list tasks, inspect their stacks, and cancel them without stopping the service.
Finding a Task That Never Yields
A common bug: a coroutine does CPU-heavy work in a loop without awaiting, starving all other coroutines.
import asyncio
import time
# BAD: CPU-bound loop without awaiting - blocks everything for 30 seconds
async def malicious_coroutine():
start = time.time()
result = 0
while time.time() - start < 30:
result += sum(x**2 for x in range(10_000)) # CPU work, no await
return result
# Detecting it:
async def detect_slow_callbacks():
"""
asyncio.get_event_loop().slow_callback_duration sets the threshold
for logging warnings about slow callbacks.
"""
loop = asyncio.get_running_loop()
loop.slow_callback_duration = 0.1 # warn if any callback takes > 100ms
# asyncio will log warnings like:
# "Executing <Task ... coro=<malicious_coroutine()>> took 30.2 seconds"
pyinstrument for Async Profiles
from pyinstrument import Profiler
import asyncio
profiler = Profiler(async_mode='enabled')
async def main():
with profiler:
await run_application()
asyncio.run(main())
profiler.open_in_browser()
pyinstrument with async_mode='enabled' correctly handles async code - it profiles across await boundaries and shows the time tasks spend waiting vs. running.
Production Patterns Reference
Startup and Shutdown Checklist
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio
import httpx
from sqlalchemy.ext.asyncio import create_async_engine
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Production-grade lifespan: initialise all shared resources."""
# 1. HTTP client pool
app.state.http_client = httpx.AsyncClient(
limits=httpx.Limits(
max_connections=200,
max_keepalive_connections=50,
),
timeout=httpx.Timeout(connect=2.0, read=10.0),
)
# 2. Database engine
app.state.db_engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
)
# 3. Semaphore for external API rate limiting
app.state.api_semaphore = asyncio.Semaphore(50)
# 4. Background tasks queue (bounded)
app.state.work_queue = asyncio.Queue(maxsize=500)
yield
# Shutdown - reverse order
await app.state.http_client.aclose()
await app.state.db_engine.dispose()
Interview Questions
Q1: The asyncio event loop is single-threaded. How does it handle 1000 concurrent HTTP requests without running out of threads?
The event loop does not use a thread per connection. Instead, it uses OS-level I/O multiplexing: epoll on Linux, kqueue on macOS/BSD, IOCP on Windows. A single epoll_wait() call can monitor thousands of file descriptors simultaneously, returning immediately with the list of descriptors that are ready (have data to read or can accept writes).
The flow for 1000 concurrent HTTP requests:
- 1000 sockets are opened and registered with epoll
epoll_wait()is called with all 1000 file descriptors- When data arrives on any socket,
epoll_wait()returns a list of ready fds - The event loop executes the callbacks for the ready sockets (calling
recv()on each) - Completed callbacks resume the waiting coroutines (
await response) epoll_wait()is called again with the remaining waiting sockets
The key: blocking on epoll_wait() is cheap (one system call waits on all fds). There are no threads allocated per connection - the event loop thread serves all connections via callbacks. This is why asyncio can handle tens of thousands of concurrent connections with minimal memory overhead (~few KB per task vs ~2MB stack per thread).
Q2: What is connection pool exhaustion and how would you diagnose and fix it in a production FastAPI service?
Pool exhaustion occurs when all connections in the pool are checked out simultaneously and a new request must wait for one to be returned. If the wait exceeds pool_timeout, a PoolTimeout exception is raised - the request fails.
Diagnosis:
- Error logs:
PoolTimeoutorsqlalchemy.exc.TimeoutErrorspike during load - Monitor pool metrics:
pool.checkedout()consistently equalspool_size + max_overflow - p99 latency increases dramatically under load (pool wait time is included in request latency)
Root causes and fixes:
-
pool_size is too small: increase it. Rule of thumb:
pool_size = (number of async workers) × (average concurrent DB queries per request). -
Connections not being returned: a coroutine that raises an exception inside a
with session:block without proper error handling can leave a connection checked out. Fix by ensuring all session code uses atry/finallyorasync with session:context manager. -
Slow queries holding connections: a query that takes 30 seconds holds a pool connection for 30 seconds. Diagnose with query timing logs. Fix: query optimisation, adding indexes, or query timeouts.
-
max_overflow too low:
max_overflowallows temporary extra connections during peaks. If your traffic has sharp spikes, increasingmax_overflowbuys time for the spike to subside. -
Upstream service is slow: if the downstream database is the bottleneck, adding more connections is a partial fix. Address the root cause.
Q3: When should you use asyncio.gather vs asyncio.TaskGroup? Is there a performance difference?
There is no meaningful performance difference - both schedule tasks on the same event loop and both run them concurrently. The difference is in error handling semantics and code structure.
asyncio.gather with return_exceptions=False (default) propagates the first exception raised, but does not cancel the other tasks. Tasks that were started continue running even after the exception propagates to the caller. This can cause background tasks to run after their results are no longer needed - wasting resources and potentially causing confusing side effects.
asyncio.TaskGroup (Python 3.11+) implements structured concurrency: if any task raises an exception, all sibling tasks are immediately cancelled. The async with TaskGroup() as tg: block does not exit until all tasks are complete (or cancelled). No task can outlive its scope.
Use asyncio.TaskGroup when:
- You want "all succeed or all fail" semantics
- You want strong guarantees that tasks do not outlive their scope
- You are writing new code targeting Python 3.11+
Use asyncio.gather when:
- You need
return_exceptions=True(collect all results including exceptions) - You need compatibility with Python < 3.11
- You want tasks to continue even if one fails (some dashboards, batch collectors)
Q4: Describe the correct way to run a CPU-bound function from an async handler. What goes wrong if you call it directly in the coroutine?
Calling a CPU-bound function directly in a coroutine blocks the event loop thread for the function's entire duration. During this time, the event loop cannot process any other coroutines - no other requests are handled, no I/O callbacks fire, and asyncio.sleep() calls in other tasks do not resume. The service is effectively frozen.
The correct approach for CPU-bound work is loop.run_in_executor() with a ProcessPoolExecutor:
async def handler(data: list[float]) -> float:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
process_pool, # ProcessPoolExecutor - bypasses the GIL
cpu_heavy_function, # the function to run
data, # its argument
)
return result
run_in_executor runs the function in a subprocess (ProcessPoolExecutor) or thread (ThreadPoolExecutor), while the event loop continues serving other requests. When the work completes, the event loop is notified and the coroutine resumes.
Important distinction: ThreadPoolExecutor is appropriate for blocking I/O-bound code (legacy synchronous HTTP clients, blocking database drivers) but does NOT bypass the GIL for CPU-bound Python code. For CPU-intensive Python, use ProcessPoolExecutor. For CPU-intensive NumPy/Cython/Numba code (which releases the GIL internally), ThreadPoolExecutor is sufficient and has lower overhead than spawning processes.
Q5: You are building a service that calls 50 external APIs per request. With asyncio.gather, all 50 calls start simultaneously. What are the risks and how would you control them?
Risk 1 - Overwhelming the downstream services: 1000 req/s × 50 concurrent calls = 50,000 outbound connections simultaneously. Most external APIs have rate limits and connection caps. Exceeding these causes 429 (rate limit) or connection refused errors.
Risk 2 - Pool exhaustion: your connection pool has a fixed number of connections. 1000 req/s × 50 simultaneous calls each requires a pool of 50,000 to avoid queuing - not realistic.
Risk 3 - Memory spikes: 50,000 in-flight responses, each holding a response buffer in memory, can cause OOM under load.
Controls:
-
asyncio.Semaphore- cap concurrent calls to each service.semaphore = asyncio.Semaphore(10)limits to 10 concurrent calls to a given API. -
Staggered fanout - do not fire all 50 calls simultaneously. Process them in batches of 10 with a small delay between batches.
-
Timeouts per call -
asyncio.wait_for(fetch(), timeout=2.0)ensures slow upstream APIs do not hold connections indefinitely. -
Circuit breaker - if a service returns errors for 5 consecutive calls, stop sending to it for 30 seconds. Libraries like
aiobreakerorcircuitbreakerimplement this pattern. -
Correct pool sizing - set
max_connectionsin the HTTP client to match your concurrency target. WithSemaphore(10)and 1000 req/s, the peak concurrent calls to any one service is 10 - setmax_keepalive_connectionsto something reasonable like 20.
